Spark重点解析(一) => SparkCore

一. Spark 与 MapReduce 区别

Apache Spark™ is a fast and general engine for large-scale data processing.

与mapreduce比较

  • Spark大多数执行过程是基于内存的迭代

  • MapReduce 的 优点, SparkCore 都有

  • Hive 能做的操作, SparkSQL 都能做, 可以写 SQL 语句转换为 SparkCore 代码

  • Spark Streaming 提供近实时流

  • 超过80个类似于 map, reduce 这样的操作

  • 可以在Tachyon(基于内存的分布式的文件系统 (HDFS 是基于磁盘)) 上运行Spark, 会更快

image-20180725150106695


二. 什么是RDD

1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

  • 如果读取的文件本来就存在于3个分区, 这些操作会并行操作, 如何并行操作? :TODO
  • 如果存在于3个分区, 手动规定了2个分区, 那么是如何工作的 ? :TODO

3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建

  • 从文件系统读取: local 或 HDFS sc.textFile("/Users/shixuanji/Documents/a.txt",2);
  • Hive 表: : TODO
  • 并行化的方式创建(多用于测试): val rdd = sc.makeRDD(1 to 10) 或者 val rdd = sc.parallelize(arr);

4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性 == 灵活)

下图中画橙色框的都是 RDD

image-20180725150715950

源码中的注释说明

1、A list of partitions:一组分片(Partition),即数据集的基本组成单位

2、A function for computing each split:一个计算每个分区的函数

3、A list of dependencies on other RDDs:RDD 之间的依赖关系

  • NarrowDependency 完全依赖, 窄依赖
  • ShuffleDependency 部分依赖, ‘’宽依赖’’

4、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):

  • Partitioner: 自定义分区使用

5、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):一个列表,存储存取每个 Partition 的优先位置(preferred location)。

Spark 中其它重要概念

Cluster Manager:Spark 的集群管理器,主要负责资源的分配与管理。集群管理器分配的资 源属于一级分配,它将各个 Worker 上的内存、CPU 等资源分配给应用程序,但是并不负责 对 Executor 的资源分配。目前,Standalone、YARN、Mesos、K8S,EC2 等都可以作为 Spark 的集群管理器。

Master:Spark 集群的主节点。

Worker:Spark 集群的工作节点。对 Spark 应用程序来说,由集群管理器分配得到资源的 Worker 节点主要负责以下工作:创建 Executor,将资源和任务进一步分配给 Executor,同步 资源信息给 Cluster Manager。

Executor:执行计算任务的一些进程。主要负责任务的执行以及与 Worker、Driver Application 的信息同步。

Driver Appication:客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换 为 RDD 和 DAG,并与 Cluster Manager 进行通信与调度。

关系:

image-20180726173416513


image-20180726173433286

1、用户使用 SparkContext 提供的 API(常用的有 textFile、sequenceFile、runJob、stop 等) 编写 Driver Application 程序。此外 SQLContext、HiveContext 及 StreamingContext 对 SparkContext 进行封装,并提供了 SQL、Hive 及流式计算相关的 API。

2、使用 SparkContext 提交的用户应用程序,首先会使用 BlockManager 和 BroadcastManager 将任务的 Hadoop 配置进行广播。然后由 DAGScheduler 将任务转换为 RDD 并组织成 DAG, DAG 还将被划分为不同的 Stage。最后由 TaskScheduler 借助 ActorSystem 将任务提交给集群 管理器(Cluster Manager)。

3、集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到 Worker 上,Worker 创建 Executor 来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为 Spark 的集 群管理器。

注意: 如果是 –deploy-mode client 模式, client 就是 Driver, –deploy-mode cluster 模式, Driver 是由集群分配的一台 worker节点

三. Spark 的架构(standalone)

涉及到的名词: Driver, Master, Worker, Executor , Task

image-20180725155654170


四. Spark 任务提交

参考官网 http://spark.apache.org/docs/latest/submitting-applications.html

Client模式

不指定deploy-mode ,默认就是client模式,也就是哪一台服务器提交spark代码,那么哪一台就是driver服务器

Cluster模式

需要指定deploy-mode,driver服务器并不是提交代码的那一台服务器,而是在提交代码的时候,在worker主机上,随机挑选一台作为driver服务器,那么如果提交10个应用,那么就有可能10台driver服务器

–master spark://xxxxxx

需要启动 spark 集群

–master yarn

不需要启动 spark 集群, 提交的程序由 yarn 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000

# Run on a YARN cluster (公司常用)
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000

# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000

五. Transformation和action原理

Spark支持两种RDD操作:transformationaction

  • transformation操作会针对已有的RDD创建一个新的RDD
  • action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序

例如,map就是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。

transformation的特点就是lazy特性。lazy特性指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果

action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。

image-20180725164407028


六. Transformation 和 Action 算子

1.Transformation 算子

Java 版代码, 见这里 & 上层目录

Scala 版代码, 见这里 & 上层目录

或 ../

map:

对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter:

对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

flatMap:

和map差不多,但是flatMap生成的是多个结果

groupByKey, reduceByKey, sortByKey :

凡是这种.. ByKey 的, 必须传入一个对偶元祖, Java中是 JavaPairRdd

cogroup 与 join 与 union 区别:

详情可参考

  • cogroup
    • 相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
    • return: JavaPairRDD[K, (JIterable[V], JIterable[W])
    • RDD的value是一个Pair的实例,这个实例包含两个Iterable的值, V表示的是RDD1中相同KEY的值, W表示的是RDD2中相同key的值.
  • join
    • 相当于SQL中的内关联join只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
  • leftOuterJoin / rigthOuterJoin
    • left: 左边的一定显示, 右边的 Join 上才显示
    • right: 右边的一定显示, 左边的 join 上才显示
  • union
    • 求rdd并集,但是不去重

Intersection,Distinct,Cartesian

  • intersection
    • intersection 求交集,提取两个rdd中都含有的元素。
    • Returns a new RDD that contains the intersection of elements in the source dataset and the argument.
  • Distinct (独特的,有区别的)
    • 去重
    • Return a new RDD containing the distinct elements in this RDD.
  • Cartesian (笛卡尔积)
    • 笛卡尔积, 全连接, 前后集合个数为a,b, a x b 种组合

mapPartition,reparation,coalesce

  • mapPartition

    • 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多。
    • 比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection
  • coalesce

    • coalesce: 只能用于减少分区的数量,而且可以选择不发生shuffle 其实说白了他就是合并分区
    • repartition:可以增加分区,也可以减少分区必须会发生shuffle,相当于就是进行重新分区
  • reparation

    • reparition是coalesce shuffletrue的简易实现

sample 和 aggregateByKey

  • sample

    • 对RDD中的集合内元素进行采样,第一个参数withReplacement是true表示有放回取样,false表示无放回。第二个参数表示比例

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      /**
      *- @param withReplacement can elements be sampled multiple times (replaced when sampled out)
      @param fraction expected size of the sample as a fraction of this RDD's size
      seed 最好不要动
      */
      def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]
  • aggregateByKey

    • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) //按照key进行聚合
    • 其实reduceBykey 就是aggregateByKey的简化版。
    • aggregateByKey多提供了一个函数 seqOp 类似于Mapreduce的combine操作(就在map端执行reduce的操作)

mapPartitionsWithIndex 和 repartitionAndSortWithinPartitions

  • mapPartitionsWithIndex
    • 说白了就是可以打印出当前所在分区数
  • repartitionAndSortWithinPartitions
    • 该方法依据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序;通过对比sortByKey发现,这种方式比先分区,然后在每个分区中进行排序效率高,这是因为它可以将排序融入到shuffle阶段。

2. Action 算子

Java 代码见这里

reduce();

  • def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)

collect();

  • Return an array that contains all of the elements in this RDD.
  • this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

take(n);

  • Take the first num elements of the RDD. This currently scans the partitions one by one, so it will be slow if a lot of partitions are required. In that case, use collect() to get the whole RDD instead.
  • this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

count();

  • Return the number of elements in the RDD.

takeOrdered();

  • Returns the first k (smallest) elements from this RDD using the natural ordering for T while maintain the order.
  • top(n): 自然排序后,最大的前 n

saveAsTextFile();

  • Save this RDD as a text file, using string representations of elements.

countByKey();

  • return map<String, Integer>, key 为 key, value 为 key 的数量

takeSample();

  • withReplacement:元素可以多次(重复)抽样(在抽样时替换) 如果为 false, 在抽样数 > 样本数时, 只能返回样本数的样本
  • num:返回的样本的大小
  • seed:随机数生成器的种子
1
2
3
4
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]

七. RDD 的持久化

将数据通过操作持久化(或缓存)在内存中是Spark的重要能力之一。当你缓存了一个RDD,每个节点都缓存了RDD的所有分区。这样就可以在内存中进行计算。这样可以使以后在RDD上的动作更快(通常可以提高10倍)。

你可以对希望缓存的RDD通过使用persist或cache方法进行标记。它通过动作操作第一次在RDD上进行计算后,它就会被缓存在节点上的内存中。Spark的缓存具有容错性,如果RDD的某一分区丢失,它会自动使用最初创建RDD时的转换操作进行重新计算。

另外,RDD可以被持久化成不同的级别。比如,可以允许你存储在磁盘,内存,甚至是序列化的Java对象(节省空间),备份在不同的节点上,或者存储在基于内存的文件系统Tachyon上。通过向persist()方法传递StorageLevel对象来设置。cache方法是使用默认级别StorageLevel.MEMORY_ONLY的方法。

选持久化方案建议:

  1. 优先选择MEMORY_ONLY,如果可以用内存缓存所有的数据,那么也就意味着我的计算是纯内存的计算,速度当然快。
  2. 如果MEMORY_ONLY 缓存不了所有的数据,MEMORY_ONLY_SER 把数据实现序列化然后进行存储。这样也是纯内存操作,速度也快,只不过需要耗费一点cpu资源需要反序列化。
  3. 也可以选用带_2这种方式, 此方式会存2份, 一份存在本地, 另一份会存到另外的节点。恢复速度的时候可以使用备份。
  4. 能不能使用DISK的,就不使用DISK,有时候从磁盘读,还不如从新计算一次。

关于tachyon

Spark2.0开始就不把tachyon(现在成为alluxio)集成在自身内部了, 依然可以直接用

基于内存的分布式文件系统

出现原因:

  • spark运行以 JVM为基础,所以spark的任务会把数据存入JVM的堆中,随着计算的迭代,JVM堆中存放的数据量迅速增大,对于spark而言,spark的计算引擎和存储引擎处在同一个JVM中,所以会有重复的GC方面的开销。这样就增大了系统的延时。
  • 当JVM崩溃时,缓存在JVM堆中的数据也会消失,这个时候spark不得不根据RDD的血缘关系重新计算数据。
  • 如果spark需要其他的框架的共享数据,比如就是hadoop的Mapreduce,这个时候就必须通过第三方来共享,比如借助HDFS,那么这样的话,就需要额外的开销,借助的是HDFS,那么就需要磁盘IO的开销。
  • 因为我们基于内存的分布式计算框架有以上的问题,那么就促使了内存分布式文件系统的诞生,比如tachyon。

Tachyon可以解决spark的什么问题呢?

如果我们把数据存放到tachyon上面:

  • 减少Spark GC的开销。
  • 当spark 的JVM崩溃的时候,存放在tachyon上的数据不受影响。
  • spark如果要想跟被的计算工具共享数据,只要通过tachyon的Client就可以做到了。并且延迟远低于HDFS等系统。

八. 广播变量 和 累加器

1. 广播变量

案例> 广播 ip 规则, 匹配 ip 十进制地址, 存入 mysql, 完整过程图示

每个 executor 拥有一份, 这个 executor 启动的 task 会共享这个变量

使用了广播变量之后, executor 中所有的 task 都会共享此变量, 否则每个 task 都会发一份

在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。

使用:

1
2
3
4
5
// 定义
val a = 3
val broadcast = sc.broadcast(a)
// 获取
val c = broadcast.value

image-20180726170621192

2. 累加器

使用场景: 异常监控,调试,记录符合某特性的数据的数目等

如果一个变量不被声明为一个累加器,那么它将在 被改变时不会在 driver 端进行全局汇总,即在分布式运行时每个 task 运行的只是原始变量的一个副本,并不能改变原始变量的值

但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

累加器在 Driver 端定义赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更 新

使用:

1
2
3
4
// 定义
val a = sc.longAccumulator(0)
// 获取
val b = a.value

image-20180726170906622


九. Spark on Yarn 模式

配置: 只需要在conf/spark-env.sh 中配置export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 就可以了

使用YARN模式的时候,不需要启动master和worker了。

只需要启动HDFS和YARN即可。

Standalone主要区别是: spark-submit后面的参数中--master后面的不是 spark://…., 而是 yarn, 这样:--master yarn

Spark on Yarn 模式注意点:

注意:如果你配置spark-on-yarn的client模式,其实会报错。

内部有一个内存检测机制

修改所有yarn节点的yarn-site.xml,在该文件中添加如下配置

1
2
3
4
5
6
7
8
9
10
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>


<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

–deploy-mode client

image-20180727005721062

–deploy-mode cluster

image-20180727005857420

十. 宽窄依赖

窄依赖是指父RDD的每个分区都只被子RDD一个分区使用。(独生, NarrowDependency)

宽依赖就是指父RDD的分区被多个子RDD的分区所依赖。 (超生, ShuffleDependency)

image-20180727010441959

十一. Stage 划分

开发完一个应用以后,把应用提交到集群,那么这个应用就叫做Application

这个应用里面我们开发了好多代码,这些代码里面凡是遇到一个action操作,就会产生一个job任务。

也就意味着,一个Application有一个或者一个以上的job任务。

然后这些job任务划分为不同stage去执行,stage里面就是运行不同的task任务。

遇到一个 shuffle 算子, 就会从中间分开, 划分为2个 stage

Task计算的就是分区上面的数据。

image-20180727012556575

十二. Spark 任务调度

Shuffle 机制见下一篇

1.简版

image-20180727013025763

2.完整版

注意:

  • AppClient / clientActor:在 Standalone 模式下的实现是 StandaloneAppClient
  • dirverActor: :TODO
  • 第7点, ApplicationDescription参数中封装的是一些系统信息, 和 用户设置的 cpu 的核数, 执行的线程数之类的数据
  • worker 端是在全图的 第 19步(action 算子触发后, 会提交 job )才知道, 去哪里计算数据.

十三. TopN 案例

对一个文件里面的单词进行单词计数,然后取前3个出现次数最多的三个单词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object TopN {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("TopN")
val sc=new SparkContext(conf);
val file=sc.textFile("file://...")
val topN=file.flatMap { line => line.split("\t") }
.map { word => (word,1) }
.reduceByKey(_+_) //key word value:count
.map(tuple =>(tuple._2,tuple._1)) // swap k,v
.sortByKey(false) // sort the count
.take(3) // take top 3
for( i <- topN){
println(i._2 + " 出现次数:"+i._1);
}
}
}

十四. 网站访问日志分析

需求分析

需求一 :

The average, min, and max content size of responses returned from the server.

需求二:

A count of response code’s returned.

需求三:

All IPAddresses that have accessed this server more than N times.

需求四:

The top endpoints requested by count. TopN 找出被访问次数最多的地址的前三个

Data

1
2
3
4
5
log.txt
------------
10.0.0.153#-#-#[12/Mar/2004:12:23:18-0800]#"GET /cgi-bin/mailgraph.cgi/mailgraph_3_err.png HTTP/1.1"#200#5554
10.0.0.153#-#-#[12/Mar/2004:12:23:40-0800]#"GET /dccstats/index.html HTTP/1.1"#304#2000
10.0.0.153#-#-#[12/Mar/2004:12:23:41-0800]#"GET /dccstats/stats-spam.1day.png HTTP/1.1"#200#2964

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ApacheAccesslog
-----------------------

case class ApacheAccesslog(
ipAddress:String, // ip地址
clientIndentd:String, //标识符
userId:String ,//用户ID
dateTime:String ,//时间
method:String ,//请求方式
endPoint:String ,//目标地址
protocol:String ,//协议
responseCode:Int ,//网页请求响应类型
contenSize:Long //内容长度

)

object ApacheAccesslog {
def parseLog(log:String):ApacheAccesslog={
val logArray= log.split("#");
val url=logArray(4).split(" ")

ApacheAccesslog(logArray(0),logArray(1),logArray(2),logArray(3),url(0),url(1),url(2),logArray(5).toInt,logArray(6).toLong);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class LogAnalyer
--------------------
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object LogAnalyer {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("LogAnalyer")
val sc=new SparkContext(conf);
val logsRDD=sc.textFile("file://....")
.map { line => ApacheAccesslog.parseLog(line) }
.cache()
/**
* 需求一
* The average, min, and max content size of responses returned from the server.
*/
val contextSize=logsRDD.map { log => log.contenSize }
// get max
val maxSize=contextSize.max()
//get min
val minSize=contextSize.min()
// total / count
//get average
val averageSize=contextSize.reduce(_+_)/contextSize.count()
println("=============================需求一-==============================");
println("最大值:"+maxSize + " 最小值:"+minSize + " 平均值:"+averageSize);
/**
* 需求二
* A count of response code's returned.
*/
println("=============================需求二-==============================");
logsRDD.map { log => (log.responseCode,1) }
.reduceByKey(_+_)
.foreach(result => println(" 响应状态:"+result._1 + " 出现的次数:"+result._2))
/**
* 需求三
* All IPAddresses that have accessed this server more than N times.
*/
println("=============================需求三-==============================");
val result= logsRDD.map { log =>( log.ipAddress,1) }
.reduceByKey(_+_)
.filter(result => result._2 > 1) // > 10000
.take(2) // > 10
for( tuple <- result){
println("ip : "+tuple._1 + " 出现的次数:"+tuple._2);
}
/**
* 需求四
* The top endpoints requested by count. TopN
*/
println("=============================需求四-==============================");
val topN=logsRDD.map { log => (log.endPoint,1) }
.reduceByKey(_+_)
.map(result => (result._2,result._1))
.sortByKey(false)
.take(2)
for(tuple <- topN){
println("目标地址 : "+tuple._2 + " 出现的次数:"+tuple._1);
}
logsRDD.unpersist(true)
}
}

十五. 线程池

3种线程池的构造方式见代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by zx on 2017/10/10.
*/
public class ThreadPoolDemo {

public static void main(String[] args) {

//创建一个单线程的线程池(池中只有一个线程)
//ExecutorService pool = Executors.newSingleThreadExecutor();

//固定大小的线程池(参数传几个就是几个)
//ExecutorService pool = Executors.newFixedThreadPool(5);


//可缓冲的线程词(可以有多个线程)
ExecutorService pool = Executors.newCachedThreadPool();

for(int i = 1; i <= 20; i ++) {

pool.execute(new Runnable() {
@Override
public void run() {
//打印当前线程的名字
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is over");
}
});

}

System.out.println("all Task is submitted");
//pool.shutdownNow();
}
}
如果帮到你, 可以给我赞助杯咖啡☕️
0%